OPC Studio User's Guide and Reference
Installed Examples - Console - SimpleTrillApplication

This example shows how to query real-time OPC UA data using Trill.

The main program:

// Shows how to query real-time OPC UA data using Trill. The query looks for event sequences where the previous value is
// less than 42, and the current value is greater than or equal to 42.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-OpcStudio/Latest/examples.html .

using System;
using System.Reactive.Linq;
using Microsoft.StreamProcessing;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.Generic;
using OpcLabs.EasyOpc.UA.Reactive;

namespace SimpleTrillApplication
{
    public sealed class Program
    {
        public static void Main(string[] args)
        {
            // Define which server we will work with.
            UAEndpointDescriptor endpointDescriptor =
                "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

            Console.WriteLine("Creating source observable...");
            // Create a data change observable for specified OPC UA node value.
            IObservable<UAAttributeData<byte>> sourceObservable = UADataChangeNotificationObservable
                .Create<byte>(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10846", 200)
                .Where(eventArgs => eventArgs.Succeeded)    // ignore events that carry failures
                .Select(eventArgs => eventArgs.TypedAttributeData);

            Console.WriteLine("Creating input streamable...");
            // Load the observable as a stream in Trill, injecting a punctuation every second. Because we use
            // FlushPolicy.FlushOnPunctuation, this will also flush the data every second.
            IObservableIngressStreamable<UAAttributeData<byte>> inputStreamable =
                sourceObservable.Select(attributeData => 
                        StreamEvent.CreateStart(attributeData.SourceTimestamp.Ticks, attributeData))
                    .ToStreamable(
                        DisorderPolicy.Drop(),
                        FlushPolicy.FlushOnPunctuation,
                        PeriodicPunctuationPolicy.Time((ulong)TimeSpan.FromSeconds(1).Ticks));

            // We will be building a query that takes a stream of UAAttributeData<byte> events.
            Console.WriteLine("Creating the query...");
            // Query: look for pattern of [value < 42] --> [value >= 42]
            IStreamable<Empty, Tuple<byte, byte>> query = inputStreamable
                .AlterEventDuration(TimeSpan.FromSeconds(10).Ticks)
                .Detect(
                    default(Tuple<byte, byte>), // register to store the value
                    pattern => pattern
                        .SingleElement(e => e.TypedValue < 42, (ts, ev, reg) => new Tuple<byte, byte>(ev.TypedValue, 0))
                        .SingleElement(e => e.TypedValue >= 42, (ts, ev, reg) => new Tuple<byte, byte>(reg.Item1, ev.TypedValue)));

            Console.WriteLine("Running the query for 20 seconds...");
            query.ToStreamEventObservable()
                .Take(TimeSpan.FromSeconds(20))
                .ForEachAsync(streamEvent => Console.WriteLine(streamEvent)).Wait();

            Console.WriteLine("Finished.");
        }



        // Example output:
        //Creating source observable...
        //Creating input streamable...
        //Creating the query...
        //Running the query for 20 seconds...
        //[Punctuation: 637080168490000000]
        //[Interval: 637080168493266740 - 637080168591235679, (17, 76)]
        //[Punctuation: 637080168500000000]
        //[Interval: 637080168509585008 - 637080168607485434, (28, 197)]
        //[Punctuation: 637080168510000000]
        //[Punctuation: 637080168520000000]
        //[Punctuation: 637080168530000000]
        //[Punctuation: 637080168540000000]
        //[Interval: 637080168540136670 - 637080168638105248, (23, 108)]
        //[Punctuation: 637080168550000000]
        //[Interval: 637080168556412772 - 637080168654381015, (20, 157)]
        //[Punctuation: 637080168560000000]
        //[Interval: 637080168560631254 - 637080168658599730, (36, 53)]
        //[Punctuation: 637080168570000000]
        //[Interval: 637080168572974888 - 637080168670943594, (34, 113)]
        //[Punctuation: 637080168580000000]
        //[Interval: 637080168589224991 - 637080168687193550, (25, 176)]
        //[Punctuation: 637080168590000000]
        //[Punctuation: 637080168600000000]
        //[Interval: 637080168605474719 - 637080168703443726, (1, 161)]
        //[Punctuation: 637080168610000000]
        //[Interval: 637080168611568222 - 637080168709537000, (32, 222)]
        //[Punctuation: 637080168620000000]
        //[Punctuation: 637080168630000000]
        //[Punctuation: 637080168640000000]
        //[Interval: 637080168644224600 - 637080168742193693, (24, 87)]
        //[Punctuation: 637080168650000000]
        //[Punctuation: 637080168660000000]
        //[Interval: 637080168660475491 - 637080168758444908, (40, 230)]
        //[Punctuation: 637080168670000000]
        //Finished.
    }
}

 

See Also